[FLINK-30979] Support shuffling data by partition#522
Merged
JingsongLi merged 4 commits intoFeb 14, 2023
Conversation
JingsongLi
reviewed
Feb 11, 2023
| public static final ConfigOption<Integer> SINK_PARALLELISM = FactoryUtil.SINK_PARALLELISM; | ||
|
|
||
| public static final ConfigOption<Boolean> SINK_SHUFFLE_BY_PARTITION = | ||
| ConfigOptions.key("sink.shuffle-by-partition.enable") |
Contributor
There was a problem hiding this comment.
If the name is the same as that of Flink, will it lead to another useless partition shuffle? See Flink StreamPhysicalSinkRule.
Contributor
Author
There was a problem hiding this comment.
Thanks @JingsongLi I have rebased from master and rename the config name
cbdaa03 to
7aa53c2
Compare
JingsongLi
reviewed
Feb 13, 2023
| <td>Define a custom parallelism for the scan source. By default, if this option is not defined, the planner will derive the parallelism for each statement individually by also considering the global configuration.</td> | ||
| </tr> | ||
| <tr> | ||
| <td><h5>sink-store.shuffle-by-partition.enable</h5></td> |
Contributor
There was a problem hiding this comment.
Maybe we can document this in Writing Tables page?
| <td>Define a custom parallelism for the scan source. By default, if this option is not defined, the planner will derive the parallelism for each statement individually by also considering the global configuration.</td> | ||
| </tr> | ||
| <tr> | ||
| <td><h5>sink-store.shuffle-by-partition.enable</h5></td> |
Contributor
There was a problem hiding this comment.
just sink.partition-shuffle?
Contributor
Author
|
@JingsongLi DONE |
Contributor
Author
|
The failed test case is a known problem and I have created an issue https://issues.apache.org/jira/browse/FLINK-31039 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Currently sink operator in flink will shuffle data by bucket id, which cause data skew when there is only 1 bucket with multiple partitions in the table. This PR aims to support shuffling data by bucket id and partition when
sink.shuffle-by-partition.enableis set.The main changes are
sink.shuffle-by-partition.enableto support shuffling data by partitionPartitionComputerto get partition from row dataBucketStreamPartitionerThe main tests are
FileStoreShuffleBucketTestto shuffle data by bucket and partition